package com.zhang.second.day03;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

/**
 * @title: KeyedProcessFunction定时器的使用
 * @author: zhang
 * @date: 2022/1/30 15:33
 */
public class Example8 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .socketTextStream("localhost", 9999)
                .keyBy(r -> 1)
                .process(new KeyedProcessFunction<Integer, String, String>() {
                    @Override
                    public void processElement(String value, KeyedProcessFunction<Integer, String, String>.Context ctx, Collector<String> out) throws Exception {
                        //获取事件到达process算子的时间
                        long currTs = ctx.timerService().currentProcessingTime();
                        out.collect("事件" + value + "到达了！到达时间为：" + new Timestamp(currTs));
                        //注册一个10s后的定时器
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, KeyedProcessFunction<Integer, String, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发了！触发时间是：" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }
}
